package com.fleapx.share.rabbit.confirm;

import com.fleapx.share.rabbit.Utils;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

/**
 * RabbitMQ发送端确认机制
 *
 * @author zengchao
 * @date 2021-05-13 09:04:04
 */
@Slf4j
public class ConfirmDemo {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //同步确认
        //syncAck();
        //异步确认
        asyncAck();
    }

    /**
     * 同步确认
     *
     * @throws IOException
     * @throws TimeoutException
     */
    public static void syncAck() throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = Utils.getFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //创建Exchange
        channel.exchangeDeclare("confirm.exchange", BuiltinExchangeType.DIRECT, true, false, new HashMap<>());
        //创建Queue
        channel.queueDeclare("confirm.queue", true, false, false, new HashMap<>());
        //绑定路由
        channel.queueBind("confirm.queue", "confirm.exchange", "confirm");

        channel.confirmSelect();
        channel.basicPublish("confirm.exchange", "confirm", new AMQP.BasicProperties(), "测试消息".getBytes(StandardCharsets.UTF_8));
        //同步确认
        boolean ack = channel.waitForConfirms();
        if (ack) {
            log.info("发送成功!");
        } else {
            log.info("发送失败!");
        }
        channel.close();
        connection.close();
    }

    /**
     * 异步确认
     *
     * @throws IOException
     * @throws TimeoutException
     */
    public static void asyncAck() throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = Utils.getFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //创建Exchange
        channel.exchangeDeclare("confirm.exchange", BuiltinExchangeType.DIRECT, true, false, new HashMap<>());
        //创建Queue
        channel.queueDeclare("confirm.queue", true, false, false, new HashMap<>());
        //绑定路由
        channel.queueBind("confirm.queue", "confirm.exchange", "confirm");

        channel.confirmSelect();
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                log.info("ack : deliveryTag = {},multiple = {}", deliveryTag, multiple);
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                log.error("nack : deliveryTag = {},multiple = {}", deliveryTag, multiple);
            }
        });

        String msgTemplate = "测试消息[%d]";
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("confirm.exchange", "confirm", new AMQP.BasicProperties(), String.format(msgTemplate, i).getBytes(StandardCharsets.UTF_8));
        }

    }
}
